#import library
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import month, year, hour, to_date, col, udf, dayofmonth, dayofyear, to_timestamp,from_unixtime, unix_timestamp
from pyspark.sql.types import DateType
from datetime import datetime
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
%matplotlib inline
import plotly
import plotly.plotly as py
plotly.tools.set_credentials_file(username='akbar0102', api_key='HhbZVjpsfob4yDF1aGpd')
import plotly.graph_objs as go
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)
#buat objek spark
spark = SparkSession.builder.appName('uk_accident').getOrCreate()
#sc = SparkContext()
sqlContext = SQLContext(spark)
#read data csv
acc = spark.read.csv('raw_data/Acc.csv', inferSchema=True, header=True)
veh = spark.read.csv('raw_data/Veh.csv', inferSchema=True, header=True)
cas = spark.read.csv('raw_data/Cas.csv', inferSchema=True, header=True)
#lihat jumlah kolom dan rekord
print('Cas',cas.count(), len(cas.columns))
print('Veh',veh.count(), len(veh.columns))
print('Acc',acc.count(), len(acc.columns))
Analisis data accident UK 2017
acc.printSchema()
#label district
district = spark.read.csv('raw_data/label_accident/district.csv', inferSchema=True, header=True, sep=';')
#label weather
weather = spark.read.csv('raw_data/label_accident/weather.csv', inferSchema=True, header=True, sep=';')
#register sql
district.registerTempTable('district')
acc.registerTempTable('acc')
weather.registerTempTable('weather')
#jumlah kecelakaan yang paling banyak terjadi untuk setiap distrik
acc.groupBy('Local_Authority_(District)').count().orderBy('count', ascending=False).show()
#jumlah korban kecelakaan setiap district secara keseluruhan
dis_sum = sqlContext.sql("SELECT `Local_Authority_(District)`, label, sum(Number_of_Casualties) as sum_casualties\
FROM acc,district \
WHERE `Local_Authority_(District)` == code \
GROUP BY `Local_Authority_(District)`, label \
ORDER BY sum_casualties DESC LIMIT 10")
dis_sum.show()
dis_sum = dis_sum.toPandas()
data = [go.Bar(
x=dis_sum['label'],
y=dis_sum['sum_casualties'],
text=dis_sum['sum_casualties'],
textposition='auto'
)]
py.iplot(data, filename='district-accident')
Grafik distrik dengan jumlah korban terbanyak, yaitu paling tinggi adalah Birmingham
#jumlah casualties setiap district berdasarkan severity(fatal,slight,serious)
district_cas = sqlContext.sql("SELECT `Local_Authority_(District)`, district.label as district, sev.label as severity, sum(`count(Casualty_Severity)`) as casualty \
FROM acc,district,sev \
WHERE `Local_Authority_(District)` == code and acc.Accident_Index == sev.Accident_Index \
GROUP BY `Local_Authority_(District)`, district.label, sev.label, Casualty_Severity \
ORDER BY district.label")
district_cas.show()
Tabel yang menunjukan jumlah korban berdasarkan tingkat keparahannya untuk setiap distrik
#accident berdasarkan kondisi cuaca
sqlContext.sql("SELECT Weather_Conditions, label, count(Weather_Conditions) \
FROM acc, weather \
WHERE acc.Weather_Conditions == weather.code AND acc.Weather_Conditions > 0\
GROUP BY Weather_Conditions, label \
ORDER BY Weather_Conditions").show()
#accident berdasarkan kondisi kecahayaan
acc.filter(acc['Light_Conditions'] > 0).groupBy('Light_Conditions').count().orderBy('Light_Conditions').show()
#accident berdasarkan kondisi jalan
acc.filter(acc['Road_Surface_Conditions'] > 0).groupBy('Road_Surface_Conditions').count().orderBy('Road_Surface_Conditions').show()
#untuk analisis tanggal, sebelumnya tipe data string -> Date
func = udf (lambda x: datetime.strptime(x, '%d/%m/%Y'), DateType())
df = acc.withColumn('new_date', func(col('Date')))
#tambah kolom baru, diambil dari date yaitu untuk kolom bulan dan tanggal
df = df.withColumn('month', month(df['new_date']))
df = df.withColumn('day', dayofmonth(df['new_date']))
#kecelakaan setiap bulan dalam tahun 2017
acc_mon = df.groupBy('month').count().orderBy('month')
acc_mon = acc_mon.toPandas()
data = [go.Bar(
x=['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'],
y=acc_mon['count'],
text=acc_mon['count'],
textposition='auto'
)]
py.iplot(data, filename='accident-month')
Grafik kecelakaan yang terjadi dalam setahun per bulan
#kecelakaan berdasarkan nama hari dalam seminggu setiap harinya selama tahun 2017
dow = acc.groupBy('Day_of_Week').count().orderBy('Day_of_Week')
#ubah ke pandas
dow = dow.toPandas()
data = [go.Bar(
x=['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'],
y=dow['count'],
text=dow['count'],
textposition='auto'
)]
py.iplot(data, filename='day-of-week')
Grafik jumlah kecelakaan yang terjadi dalam setahun untuk setiap harinya
#ambil data kecelakaan berdasarkan waktu untuk setiap tanggal dalam sebulan, apabila hendak berdasarkan bulan keberapa menggunakan filter
#tes = df.filter((df['month'] == 1)).groupBy('Time', 'day', 'month').count().orderBy('month','day', 'Time')
tes = df.groupBy('Time', 'day', 'month').count().orderBy('month','day', 'Time')
#kecelakaan setiap waktu pada setiap tanggal dan setiap bulan dalam tahun 2017
tes.show()
#kecelakaan dalam 24 jam selama tahun 2017
tim = tes.filter((tes['Time'] != 'null') & (tes['Time'] != 'NULL')).groupBy(hour('Time').alias('hour')).sum('count').orderBy('hour')
tim = tim.toPandas()
tim.head()
trace = go.Bar(
x=tim['hour'],
y=tim['sum(count)'],
text=tim['sum(count)'],
textposition='auto'
)
data = [trace]
layout = go.Layout(
title='Accident Hourly 2017',
xaxis=dict(tickmode='linear')
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='hour-accident')
Grafik kecelakaan yang terjadi selama setahun per waktu terjadinya
#kecelakaan pada setiap tanggal dalam setiap bulan selama tahun 2017
#coba untuk bulan 1 dalam 30 hari
tgl = tes.filter(tes['month'] == 1).groupBy('day', 'month').sum('count').orderBy('day')
tgl = tgl.toPandas()
trace = go.Bar(
x=tgl['day'],
y=tgl['sum(count)'],
text=tgl['sum(count)'],
textposition='auto'
)
data = [trace]
layout = go.Layout(
title='Accident Monthly - January 2017',
xaxis=dict(tickmode='linear')
)
fig = go.Figure(data=data, layout=layout)
py.iplot(fig, filename='monthly-accident')
Grafik kecelakaan yang terjadi pada tahun 2017 di bulan januari selama satu bulan
#filter lat dan lon untuk melihat kecelakaan yang terjadi pada jalan Motorways dan A(M)
maps = acc.filter(
(acc['Latitude']!='null') & (acc['Latitude']!='NULL') &
(acc['Longitude']!='null') & (acc['Longitude']!='NULL')).groupBy('Latitude','Longitude','Local_Authority_(District)','Accident_Severity').count().orderBy('Latitude','Longitude')
maps = maps.toPandas()
maps.head()
data = [ dict(
type = 'scattermapbox',
lon = maps['Longitude'],
lat = maps['Latitude'],
locationmode='United Kingdom',
marker = dict(
size = 2,
cmin = maps['Accident_Severity'].min(),
color = maps['Accident_Severity'],
cmax = maps['Accident_Severity'].max(),
colorbar=dict(
title="Accident_Severity"
),
colorscale= [[0, "rgb(0,255,0)"], [0.5, "rgb(255,128,0)"], [1, "rgb(255,0,2)"]]
),
text = maps['Local_Authority_(District)']
)]
layout = dict(
height = 900,
width = 900,
title = 'UK Accident 2017',
colorbar = True,
mapbox= dict(
accesstoken="pk.eyJ1IjoiYWtiYXIwMTAyIiwiYSI6ImNqcXFtc2c0MTBjaWIzeG8xb25xbmhid3oifQ.0Xv30-rNVlvIzDy0mZoROg",
center= dict(
lat= 54,
lon= -4
),
zoom= 4.7
)
)
fig = dict( data=data, layout=layout ,mapboxAccessToken= 'HhbZVjpsfob4yDF1aGpd' )
py.iplot( fig, validate=False, filename='uk-accident' )
Map berdasarkan keparahan kecelakaan
cek = sqlContext.sql("SELECT `Local_Authority_(District)`, label, Latitude, Longitude, 1st_Road_Class \
FROM acc, district \
WHERE `Local_Authority_(District)` == code AND 1st_Road_Class == 1 OR 1st_Road_Class == 2 \
AND Latitude != 'null' AND 'Latitude'!='NULL' AND Longitude != 'null' AND 'Longitude'!='NULL' \
GROUP BY `Local_Authority_(District)`, label, Latitude, Longitude, 1st_Road_Class \
ORDER BY label")
cek= cek.toPandas()
data = [ dict(
type = 'scattermapbox',
lon = cek['Longitude'],
lat = cek['Latitude'],
locationmode='United Kingdom',
marker = dict(
size = 2,
cmin = 0,
color = cek['1st_Road_Class'],
cmax = cek['1st_Road_Class'].max(),
colorbar=dict(
title="1st_Road_Class"
),
colorscale= [[0, "rgb(0,255,0)"], [0.5, "rgb(255,128,0)"], [1, "rgb(255,0,2)"]]
),
text = cek['label']
)]
layout = dict(
height = 900,
width = 900,
title = 'UK Accident 2017',
colorbar = True,
mapbox= dict(
accesstoken="pk.eyJ1IjoiYWtiYXIwMTAyIiwiYSI6ImNqcXFtc2c0MTBjaWIzeG8xb25xbmhid3oifQ.0Xv30-rNVlvIzDy0mZoROg",
center= dict(
lat= 54,
lon= -4
),
zoom= 4.7
)
)
fig = dict( data=data, layout=layout ,mapboxAccessToken= 'HhbZVjpsfob4yDF1aGpd' )
iplot( fig, validate=False, filename='uk-accident-district')
Map berdasarkan kecelakaan yang terjadi pada 1st_Road Motorways dan A(M)
#jumlah kendaraan yang terlibat pada setiap kategori kecelakaan
acc.groupBy('Accident_Severity').sum('Number_of_Vehicles').show()
#jumlah kecelakaan berdasarkan keparahannya
acc.groupBy('Accident_Severity').count().show()
Analisis data casualty UK 2017
cas.printSchema()
#read referensi label
age_band = spark.read.csv('raw_data/label_casualty/age_band.csv', inferSchema=True, header=True, sep=';')
cas_severity = spark.read.csv('raw_data/label_casualty/cas_severity.csv', inferSchema=True, header=True, sep=';')
#register tabel untuk sql
cas.registerTempTable('cas')
age_band.registerTempTable('age_band')
cas_severity.registerTempTable('cas_severity')
#sql jumlah casualty berdasrkan rentang usia
res = sqlContext.sql("SELECT Age_Band_of_Casualty, label, count(Age_Band_of_Casualty) \
FROM cas,age_band \
WHERE Age_Band_of_Casualty == code \
GROUP BY Age_Band_of_Casualty, label")
res.orderBy('Age_Band_of_Casualty').show()
# jumlah casualty severity berdasarkan index kecelakaan, untuk setiap keparahan kecelakaan (slight, serious, fatal)
sev = sqlContext.sql("SELECT Accident_Index ,Casualty_Severity, label, count(Casualty_Severity) \
FROM cas, cas_severity \
WHERE Casualty_Severity == code \
GROUP BY Casualty_Severity, label, Accident_Index \
ORDER BY Accident_Index, Casualty_Severity")
#register tabel sql
sev.registerTempTable('sev')
sev.show()
#jumlah casualty berdasarkan rentang usia tanpa label, hanya kode yang merepresentasikan rentang usia
cas.filter(cas['Age_Band_of_Casualty'] > 0).groupBy('Age_Band_of_Casualty').count().orderBy('Age_Band_of_Casualty').show()
#jumlah casualty berdasarkan jenis kelamin
jk = cas.filter(cas['Sex_of_Casualty'] > 0).groupBy('Sex_of_Casualty').count()
pdf1 = jk.toPandas()
#pdf1.plot(kind='bar',x='Sex_of_Casualty',y='count')
#pdf1
pdf1['count'][0]
data = [go.Bar(
x=['Male','Female'],
y=pdf1['count']
)]
iplot(data, filename='sex-casuality')
Grafik total secara keseluruhan jenis kelamin pria dan wanita yang menjadi korban pada saat kecelakaan
#jumlah casualty berdasarkan jenis kelamin, untuk setiap rentang usia
male = cas.filter((cas['Sex_of_Casualty'] > 0) & (cas['Age_Band_of_Casualty'] > 0) & (cas['Sex_of_Casualty'] == 1)).groupBy('Sex_of_Casualty', 'Age_Band_of_Casualty').count().orderBy('Sex_of_Casualty', 'Age_Band_of_Casualty')
#jumlah casualty berdasarkan jenis kelamin, untuk setiap rentang usia
female = cas.filter((cas['Sex_of_Casualty'] > 0) & (cas['Age_Band_of_Casualty'] > 0) & (cas['Sex_of_Casualty'] == 2)).groupBy('Sex_of_Casualty', 'Age_Band_of_Casualty').count().orderBy('Sex_of_Casualty', 'Age_Band_of_Casualty')
male = male.toPandas()
female = female.toPandas()
trace1 = go.Bar(
x=['0-5', '6-10', '11-15', '16-20', '21-25','26-35','36-45','46-55','56-65','66-75','Over 75'],
y=male['count'],
name='Male'
)
trace2 = go.Bar(
x=['0-5', '6-10', '11-15', '16-20', '21-25','26-35','36-45','46-55','56-65','66-75','Over 75'],
y=female['count'],
name='Female'
)
data = [trace1, trace2]
layout = go.Layout(
barmode='stack'
)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='stacked-bar')
Grafik rentang usia yang menjadi korban kecelakaan berdasarkan jenis kelaminnya, dari grafik dapat dilihat yang paling banyak adalah rentang usia 26-35 tahun dan korban terbanyak untuk setiap rentang usianya berjenis kelamin laki-laki.
cas.groupBy('Casualty_Type','Casualty_IMD_Decile').count().orderBy('Casualty_Type', 'Casualty_IMD_Decile').show()
cas.groupBy('Casualty_Severity').count().show()
Analisis data vehicle UK 2017
veh.printSchema()
#read referensi label
veh_type = spark.read.csv('raw_data/label_vehicle/veh_type.csv', inferSchema=True, header=True, sep=';')
#register tabel sql
veh.registerTempTable('veh')
veh_type.registerTempTable('veh_type')
#kendaraan yang paling banyak terjadi kecelakaan
veh_type = sqlContext.sql("SELECT Vehicle_Type, label, count(Vehicle_Type) as number_of_accident \
FROM veh, veh_type \
WHERE Vehicle_Type == code \
GROUP BY Vehicle_Type, label \
ORDER BY number_of_accident DESC")
veh_type = veh_type.toPandas()
veh_type.head()
data = [go.Bar(
x=veh_type['label'],
y=veh_type['number_of_accident']
)]
iplot(data, filename='veh-type')
Grafik kendaraan yang paling banyak terjadi kecelakaan, yaitu terjadi paling banyak pada mobil, kemudian sepeda yang kemungkinan merupakan korban kecelakaan
#jenis kelamin pengemudi yang kecelakaan
sod = veh.filter(veh['Sex_of_Driver'] > 0).groupBy('Sex_of_Driver').count()
sod = sod.toPandas()
sod
data = [go.Bar(
x=['Male', 'Female', 'Unknown'],
y=sod['count'],
name=""
)]
iplot(data, filename='sex-of-driver')
Grafik jenis kelamin dari supir yang membawa kendaraan yang terjadi kecelakaan, lebih banyak pria daripada wanita